package Transform

import Source.SourceTest.SensorReading
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._

object TransformTest {

  // 1.测试滚动聚合算子（Rolling Aggregation）(数据源源不断)
  //    1.min()   2.sum()   3.max()   4.minBy()   5.maxBy()
  def main(args: Array[String]): Unit = {

    // 1.读取sensor
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

//    val inputPath: String = "E:\\Flinklearn\\Flink\\resources\\data1\\sensor.txt"

    val inputPath: String = "hdfs://192.168.25.10:9000/Output/WordCount/part-r-00000-70129976-eb5e-4338-b557-d76c1cbb6ab4.csv"

    val inputDataStream: DataStream[String] = env.readTextFile(inputPath)

    // 2.map 转换成样例类类型
    val ds1: DataStream[SensorReading] = inputDataStream.map(
      data => {
        val fields: Array[String] = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      })
     ds1.print()

    // 3.测试min keyBy 分组聚合，输出每个传感器当前最小值

    val ds2: DataStream[SensorReading] = ds1.keyBy(0).minBy(2)
    //    ds2.print()

    // 4.任务2 输出当前时间(时间戳最大)的最小温度值，要用reduce
    //    reduce(fun:(T1,T2) => T3)
    // 遍历整个数据 T1代表之前的数据 T2代表遍历到的新数据 [替换 解决min时改变一整条数据]，T3代表结果

    // (1) 第一种reduce
    val result: DataStream[SensorReading] = ds1.keyBy(0).reduce((curstate, newData) =>
      SensorReading(curstate.id, newData.timestamp, curstate.temperature.min(newData.temperature))
    )
    //    result.print()

    // (2) 第二种reduce

    val res2: DataStream[SensorReading] = ds1.keyBy(0).reduce(new MyReduceFunction)
    // res2.print()

    // 5.多流转换操作
    // 5.1 分流，将传感器温度数据分成低温、高温两条流

    val ss1: SplitStream[SensorReading] = ds1.split(
      data => {
        if (data.temperature > 30) Seq("high") else Seq("low")
      }
    )
    val highTemp: DataStream[SensorReading] = ss1.select("high")
    val lowTemp: DataStream[SensorReading] = ss1.select("low")
    val allTemp: DataStream[SensorReading] = ss1.select("high","low")
//    highTemp.print("high")
//    lowTemp.print("low")

    // 5.2 合流 connect 实现高温预警
    val warningStream: DataStream[(String, Double)] = highTemp.map(data => (data.id, data.temperature))
    val connectedStreams: ConnectedStreams[(String, Double), SensorReading] = warningStream.connect(lowTemp)

    // 用coMap对数据进行分别处理
    val coMapResultStream: DataStream[Product] = connectedStreams.map(
      warningStream => (warningStream._1, warningStream._2, "warning"),
      lowTemp => (lowTemp.id, "healthy")
    )

//    coMapResultStream.print("coMap")

    // 5.3 union 合流
    val unionStream: DataStream[SensorReading] = highTemp.union(lowTemp, allTemp)

//    unionStream.print("unionStream")


    // final 执行程序
    env.execute()
  }

  class MyReduceFunction extends ReduceFunction[SensorReading] {
    override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
      SensorReading(t.id, t1.timestamp, t.temperature.min(t1.temperature))
    }
  }

}
